package com.shujia.ads

import com.shujia.util.FlinkTool

object AdsProvinceIndex extends FlinkTool{
  /**
    * 需要在子类实现抽象方法
    *
    */
  override def run(args: Array[String]): Unit = {

    table.executeSql(
      """
        |
        |insert into gma_ads.ads_province_index
        |select
        |a.province_id,
        |b.area_code,
        |b.name as province_name,
        |b.region_name,
        |a.order_amount as order_amount,
        |a.order_count as order_count,
        |a.dt
        |from
        |(select
        | province_id,
        | date_format(pay_time,'yyyy-MM-dd') as dt,
        | count(1) as order_count,
        | sum(sku_num*order_price) as order_amount
        |from
        |gma_dwd.dwd_paid_order_detail /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
        |where pay_time is not null
        |group by
        |province_id,date_format(pay_time,'yyyy-MM-dd')) as a
        |join
        |gma_dim.dim_kafka_mysql_region_cdc as b
        |on a.province_id =b.id
        |
        |
      """.stripMargin)
  }
}
